# Using dbt with Dagster

This guide explains how you can run a [dbt](https://docs.getdbt.com/docs/introduction) project in
your Dagster pipelines.

## What is dbt?

[dbt](https://docs.getdbt.com/docs/introduction) (data build tool) helps engineers transform
data in their warehouses by simply writing `SELECT` statements. dbt automatically builds a
dependency graph for your transformations and turns these `SELECT` statements into tables and views
in your data warehouse.

dbt not only runs your data transformations, but also can create data quality tests and generate
documentation for your data, right out of the box. To learn more about dbt, visit
[the official dbt documentation website](https://docs.getdbt.com/docs/introduction).

## How does dbt work with Dagster?

Dagster orchestrates dbt alongside _other technologies_, so you can combine dbt with Spark, Python,
etc. in a single pipeline. Dagster also provides built-in operational and data observability
capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.

`dagster-dbt` is an integration library that provides pre-built solids and resources for using dbt
together with Dagster. The solids in the library are designed to be configurable for any dbt
project.

The solids in `dagster-dbt` following the following naming convention:

- `dbt_cli_*` will invoke the dbt command via the dbt CLI. See [Use the dbt CLI in a Dagster pipeline](#use-the-dbt-cli-in-a-dagster-pipeline) for details.
- `dbt_rpc_*` will send a request to a dbt RPC server to run a dbt command and return immediately
  with a request token. See [Use the dbt RPC Server in a Dagster pipeline](#use-the-dbt-rpc-server-in-a-dagster-pipeline) for details.
- `dbt_rpc_*_and_wait` will send a request to a dbt RPC server to run a dbt command and poll the
  server until execution is completed or a timeout has been reached. See [Use the dbt RPC Server in a Dagster pipeline](#use-the-dbt-rpc-server-in-a-dagster-pipeline) for details.

## Use the dbt CLI in a Dagster pipeline

`dagster-dbt` provides solids for running commands through the
[dbt CLI](https://docs.getdbt.com/dbt-cli/cli-overview). By convention, these solids are named
`dagster_dbt.dbt_cli_*`.

To run the dbt CLI, your dbt project directory must be on your local filesystem and you must have a
dbt profile already set up to connect to your data warehouse. Visit
[the official dbt CLI documentation](https://docs.getdbt.com/dbt-cli/cli-overview) for more details.

Here are some examples of how to invoke `dbt run` with the solid `dagster_dbt.dbt_cli_run`. Other
dbt commands can be invoked via the CLI with their respective solid `dagster_dbt.dbt_cli_*`.

**Example:** The solid `dbt_cli_run` is configured to run your entire dbt project.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run endbefore=end_marker_dbt_cli_run dedent=4
from dagster import pipeline
from dagster_dbt import dbt_cli_run

config = {"project-dir": "path/to/dbt/project"}
run_all_models = dbt_cli_run.configured(config, name="run_dbt_project")

@pipeline
def my_dbt_pipeline():
    run_all_models()
```

**Example:** The solid `dbt_cli_run` is configured to run specific models in your dbt project. This
is similar to invoking `dbt run --models tag:staging`.

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_specific_models endbefore=end_marker_dbt_cli_run_specific_models dedent=4
from dagster import pipeline
from dagster_dbt import dbt_cli_run

config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]}
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models")

@pipeline
def my_dbt_pipeline():
    run_staging_models()
```

In the code snippet above, the config `"models"` takes a list of strings. The string `"tag:staging"`
uses [dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax) to
filter models with the tag `"staging"`. For more details,
[visit the official dbt documentation on the node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax).

## Use the dbt RPC Server in a Dagster pipeline

`dagster-dbt` provides solids for running commands through the
[dbt RPC server](https://docs.getdbt.com/reference/commands/rpc). By convention, these solids are
named `dagster_dbt.dbt_rpc_*`.

Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster
pipeline, you will need to create a resource for your dbt RPC server. To learn more about Dagster
resources, visit the [Resources Overview](/concepts/modes-resources).

`dagster_dbt.dbt_rpc_resource` can be configured with your specific host and port.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource endbefore=end_marker_dbt_rpc_resource dedent=4
from dagster_dbt import dbt_rpc_resource

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})
```

For convenience during local development, you may also use `dagster_dbt.local_dbt_rpc_resource`,
which is preconfigured for a dbt RPC server that is running on `http://localhost:8580`.

Here are some examples of how to send dbt commands to a dbt RPC server with a solid.

**Example:** The solid `dbt_rpc_run` will send a request to run your entire dbt project when you
don't use any solid configuration.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run endbefore=end_marker_dbt_rpc_run dedent=4
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run()
```

The code snippet above shows a Dagster pipeline with a single solid `dbt_rpc_run`. The solid
`dbt_rpc_run` has a required resource key `"dbt_rpc"`. So, any pipeline that uses `dbt_rpc_run` will
need a [ModeDefinition](/concepts/modes-resources) that defines a resource under the
key `"dbt_rpc"`.

**Example:** The solid `dbt_rpc_run` is configured to run specific models in a dbt project. This is
similar to having `"params": {"models": "tag:staging"}` in your dbt RPC request body.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_specific_models endbefore=end_marker_dbt_rpc_run_specific_models dedent=4
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

run_staging_models = dbt_rpc_run.configured(
    {"models": ["tag:staging"]},
    name="run_staging_models",
)

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    run_staging_models()
```

Note that the solid above will NOT wait until the dbt RPC server has finished executing your
request. Instead, it will return immediately with a request token from the dbt RPC server. If you
want the solid to wait until execution is finished, see the `dagster_dbt.dbt_rpc_run_and_wait`.

**Example:** The solid `dbt_rpc_run_and_wait` will send a request to run specific models in a dbt
project and then poll the dbt RPC server until it has finished executing your request.

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_and_wait endbefore=end_marker_dbt_rpc_run_and_wait dedent=4
from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run_and_wait

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run_and_wait()
```

## Use dbt Cloud in a Dagster pipeline

`dagster_dbt` currently does not provide solids for invoking dbt commands via dbt Cloud. However,
this use case is possible by writing your own solid to create and start Jobs via
[the dbt Cloud API](https://docs.getdbt.com/docs/dbt-cloud/cloud-api). For more details about each
HTTP endpoint,
[visit the official documentation for the dbt Cloud API](https://docs.getdbt.com/dbt-cloud/api).

## Advanced Configuration

For full documentation on all available config,
[visit the API docs for dagster-dbt](/\_apidocs/libraries/dagster_dbt).

**dbt CLI: Set the dbt profile and target to load**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_profile_and_target endbefore=end_marker_dbt_cli_config_profile_and_target dedent=4
config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

**dbt CLI: Set the path to the dbt executable**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_executable endbefore=end_marker_dbt_cli_config_executable dedent=4
config = {"dbt_executable": "path/to/dbt/executable"}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

**dbt CLI: Select specific models to run**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_select_models endbefore=end_marker_dbt_cli_config_select_models dedent=4
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

For more details, [visit the official documentation on dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax).

**dbt CLI: Exclude specific models**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_exclude_models endbefore=end_marker_dbt_cli_config_exclude_models dedent=4
config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

For more details, [visit the official documentation on dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax).

**dbt CLI: Set key-values for dbt vars**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_vars endbefore=end_marker_dbt_cli_config_vars dedent=4
config = {"vars": {"key": "value"}}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

For more details, [visit the official documentation on using variables in dbt](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-variables).

**dbt CLI: Disable default asset materializations**

```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_config_disable_assets endbefore=end_marker_dbt_cli_config_disable_assets dedent=4
config = {"yield_materializations": False}

from dagster_dbt import dbt_cli_run

custom_solid = dbt_cli_run.configured(config, name="custom_solid")
```

**dbt RPC: Configure a remote dbt RPC resource**

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource_example endbefore=end_marker_dbt_rpc_resource_example dedent=4
from dagster_dbt import dbt_rpc_resource

custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})
```

**dbt RPC: Select specific models to run**

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_config_select_models endbefore=end_marker_dbt_rpc_config_select_models dedent=4
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
```

For more details, [visit the official documentation on dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax).

**dbt RPC: Exclude specific models**

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_config_exclude_models endbefore=end_marker_dbt_rpc_config_exclude_models dedent=4
config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
```

For more details, [visit the official documentation on dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax).

**dbt RPC: Configure polling interval when using a `dbt_rpc_*_and_wait` solid**

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_and_wait_config_polling_interval endbefore=end_marker_dbt_rpc_and_wait_config_polling_interval dedent=4
config = {"interval": 3}  # Poll the dbt RPC server every 3 seconds.

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
```

**dbt RPC: Disable default asset materializations**

```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_config_disable_assets endbefore=end_marker_dbt_rpc_config_disable_assets dedent=4
config = {"yield_materializations": False}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")
```

## Conclusion

If you find a bug or want to add a feature to the `dagster-dbt` library, we invite you to
[contribute](/community/contributing).

If you have questions on using dbt with Dagster, we'd love to hear from you:

<p align="center">
  <a href="https://dagster-slackin.herokuapp.com/" target="_blank">
    <img
      src="https://user-images.githubusercontent.com/609349/63558739-f60a7e00-c502-11e9-8434-c8a95b03ce62.png"
      width="160px"
    />
  </a>
</p>
